---
title: prefect-snowflake
---

The `prefect-snowflake` integration makes it easy to connect to Snowflake in your Prefect flows. You can run queries both synchronously and asynchronously as Prefect flows and tasks.
## Getting started

### Prerequisites

- [A Snowflake account](https://www.snowflake.com/en/) and the necessary connection information.

### Installation

Install `prefect-snowflake` as a dependency of Prefect.
If you don't already have Prefect installed, it will install the newest version of `prefect` as well.

```bash
pip install "prefect[snowflake]"
```

Upgrade to the latest versions of `prefect` and `prefect-snowflake`:

```bash
pip install -U "prefect[snowflake]"
```

### Blocks setup

The `prefect-snowflake` integration has two blocks: one for storing credentials and one for storing connection information. Register blocks in this module to view and edit them on Prefect Cloud:

```bash
prefect block register -m prefect_snowflake
```


#### Create the credentials block

Below is a walkthrough on saving a `SnowflakeCredentials` block through code. Log into your Snowflake account to find your credentials.
The example below uses a user and password combination, but refer to the [SDK documentation](https://reference.prefect.io/prefect_snowflake/) for a full list of authentication and connection options.

```python
from prefect_snowflake import SnowflakeCredentials

credentials = SnowflakeCredentials(
    account="ACCOUNT-PLACEHOLDER",  # resembles nh12345.us-east-2.snowflake
    user="USER-PLACEHOLDER",
    password="PASSWORD-PLACEHOLDER"
)
credentials.save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
```

#### Create the connection block
Then, to create a `SnowflakeConnector` block:

1. After logging in, click on any worksheet.
2. On the left side, select a database and schema.
3. On the top right, select a warehouse.
3. Create a short script, replacing the placeholders below.

```python
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

credentials = SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

connector = SnowflakeConnector(
    credentials=credentials,
    database="DATABASE-PLACEHOLDER",
    schema="SCHEMA-PLACEHOLDER",
    warehouse="COMPUTE_WH",
)
connector.save("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
```

You can now easily load the saved block, which holds your credentials and connection info:

```python
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

SnowflakeConnector.load("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
```

## Examples

To set up a table, use the `execute` and `execute_many` methods. Then, use the `fetch_all` method. If the results are too large to fit into memory, use the `fetch_many` method to retrieve data in chunks.

By using the `SnowflakeConnector` as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.


```python
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector


@task
def setup_table(block_name: str) -> None:
    with SnowflakeConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def snowflake_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows


if __name__=="__main__":
    snowflake_flow()
```

If the native methods of the block don't meet your requirements, don't worry. You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.

```python
import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas


@flow
def snowflake_write_pandas_flow():
    connector = SnowflakeConnector.load("my-block")
    with connector.get_connection() as connection:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with connection.cursor() as cursor:
            cursor.execute(statement)

        # case sensitivity matters here!
        df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
        success, num_chunks, num_rows, _ = write_pandas(
            conn=connection,
            df=df,
            table_name=table_name,
            database=snowflake_connector.database,
            schema=snowflake_connector.schema_  # note the "_" suffix
        )
```

## Resources

Refer to the `prefect-snowflake` [SDK documentation](https://reference.prefect.io/prefect_snowflake/database/) to explore other capabilities of the `prefect-snowflake` library, such as async methods.

For further assistance using Snowflake, consult the [Snowflake documentation](https://docs.snowflake.com/) or the [Snowflake Python Connector documentation](https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-example).

